package com.xiaofan

import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy}
import org.apache.flink.api.common.state.{ListState, ListStateDescriptor, ValueState, ValueStateDescriptor}
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector

import java.time.Duration
import java.util
import scala.collection.mutable.ListBuffer


object LoginFailAdvance {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    val inputStream: DataStream[String] = env.readTextFile("D:\\big-data\\code\\UserBehaviorAnalysis\\LoginFailDetect\\src\\main\\resources\\LoginLog.csv")

    val dataStream: DataStream[LoginEvent] = inputStream
      .map {
        data => {
          val arr: Array[String] = data.split(",")
          LoginEvent(arr(0).toLong, arr(1), arr(2), arr(3).toLong)
        }
      }
      .assignTimestampsAndWatermarks {
        WatermarkStrategy
          .forBoundedOutOfOrderness[LoginEvent](Duration.ofSeconds(3))
          .withTimestampAssigner(new SerializableTimestampAssigner[LoginEvent] {
            override def extractTimestamp(element: LoginEvent, recordTimestamp: Long): Long = element.timestamp * 1000L
          })
      }

    // 进行判断和检测，如果2秒之内连续能录失败，输出报警信息
    val loginFailWarningStream: DataStream[LoginFailWarning] = dataStream
      .keyBy(_.userId)
      .process(new LoginFailWarningAdvanceResult(2))

    loginFailWarningStream.print()
    env.execute("login fail detect job")
  }
}


class LoginFailWarningAdvanceResult(failTimes: Int) extends KeyedProcessFunction[Long, LoginEvent, LoginFailWarning] {

  // 定义状态， 保存当前所有的登录失败时间，保存定时器的时间戳
  lazy val loginFailListState: ListState[LoginEvent] = getRuntimeContext.getListState(new ListStateDescriptor[LoginEvent]("loginFail-list", classOf[LoginEvent]))

  override def processElement(value: LoginEvent, ctx: KeyedProcessFunction[Long, LoginEvent, LoginFailWarning]#Context, out: Collector[LoginFailWarning]): Unit = {
    // 判断当前登录时间成功还是失败
    if (value.eventType == "fail") {
      // 1. 如果是失败，进一步做判断
      val iter: util.Iterator[LoginEvent] = loginFailListState.get().iterator()

      // 判断之前是否有登录失败事件
      if (iter.hasNext) {
        // 如果有，那么判断两次失败的时间差

        val firstFailEvent: LoginEvent = iter.next()

        if (value.timestamp < firstFailEvent.timestamp + 2) {
          // 如果在2秒之内，输出报警
          out.collect(LoginFailWarning(value.userId, firstFailEvent.timestamp, value.timestamp, "login fail 2 times"))
        }
        // 不管报警不报警，当前都已处理完毕，将状态更新为最近一次登录失败的事件
        loginFailListState.clear()
        loginFailListState.add(value)

      } else {
        loginFailListState.add(value)
      }

    } else {
      loginFailListState.clear()
    }
  }
}
